Module format Parquet

Apprendre à utiliser des fichiers au format Parquet avec R

SSP/DEMESIS

16/01/2025

0.1 Avant-propos


Ce support ne couvre pas tous les aspects des traitements qu’il est possible de réaliser avec les fichiers au format Parquet mais il constitue une base sur laquelle s’appuyer si vous rencontrez des fichiers Parquet sur Cerise ou ailleurs.

1 C’est quoi un fichier parquet ?

1.1 C’est quoi le format Parquet ?

  • Un nouveau format de données …

    • … qui compresse efficacement les fichiers
    • … interopérable
    • conçu pour que les données soient chargées rapidement en mémoire

1.2 Caractéristiques du format Parquet

  • Un stockage au format binaire (pas lisible par un humain)
  • Un stockage orienté colonne (par opposition aux csv qui sont orientés lignes)
  • Un fichier Parquet contient à la fois les données et des métadonnées

1.3 Avantages du format Parquet

  • Des fichiers moins volumineux qu’en csv 500 Mo en Parquet vs 5 Go en csv

  • Des requêtes plus rapides et efficaces
    Seulement les données nécessaires sont lues, pas tout le fichier

  • Des données conformes à la mise à disposition par le producteur (par exemple, plus de problème de codes communes…)
    => Un format très efficace pour l’analyse de données mais peu adapté à l’ajout de données en continu ou à la modification fréquente de données existantes.

1.4 Un format qui se démocratise

  • L’Insee diffuse des données du recensement de la population au format Parquet

  • Voir le guide d’utilisation joint pour manipuler ces données

  • Premières diffusions sur data.gouv avec les bureaux de vote, les demandes de valeurs foncières, indicateurs pénaux…)

  • Prévisualisations des fichiers Parquet possibles avec le nouvel explorateur de données du SSP Cloud ou avec avec l’outil ParquetViewer.

2 Comment utiliser/interroger un fichier parquet ?

2.1 Lire un fichier avec read_parquet()


library(arrow)    # Le package arrow est nécessaire pour travailler avec des fichiers parquet
library(dplyr)    # Pour utiliser dplyr
library(tictoc)   # Pour le benchmark


Pour l’exemple, nous allons prendre une table des eploitations du RA 2020 d’une centaine de MO qui contient 416 478 lignes et 255 colonnes.

tic()
RA2020 <- arrow::read_parquet("data/RA2020_exploitations.parquet")
toc()
> 1.14 sec elapsed

Le résultat obtenu est un objet directement utilisable dans R (ici un data.frame).

Il est possible de sélectionner les colonnes que l’on souhaite importer dans R directement dans la fonction read_parquet :

tic()
RA2020_extrait <- arrow::read_parquet("data/RA2020_exploitations.parquet",
                                      col_select = c("NOM_DOSSIER","SIEGE_REG","SAU_TOT"))
toc()
> 0.06 sec elapsed

2.2 Comparaison avec la lecture d’un fichier rds


Voyons l’écart avec la lecture d’un fichier rds :

tic()
RA2020 <- readRDS("data/RA2020_exploitations.rds")
toc()
> 6.15 sec elapsed



=> Le temps nécessaire au chargement de la table est d’environ 6 secondes !
L’écart est significatif rien que sur la lecture (X 6).

2.3 Des requêtes avec dplyr comme d’habitude

RA2020 est un data.frame : on peut donc utiliser la syntaxe dplyr :

resultat <- RA2020 |> 
  filter(SIEGE_REG == "93") |> 
  group_by(SIEGE_DEP) |> 
  summarise(total_SAU = sum(SAU_TOT, na.rm = TRUE))
  
# A tibble: 6 × 2
  SIEGE_DEP total_SAU
  <chr>         <dbl>
1 04          158946.
2 05           91979.
3 06           41141.
4 13          145713.
5 83           77785.
6 84          112888.
  • Le temps d’exécution de la requête est d’environ 9 secondes.
  • Les ressources consommées sont importantes

2.4 Lire et exploiter un fichier parquet volumineux


Voici ci-dessous la syntaxe recommandée pour requêter un fichier parquet volumineux :


# Établir la connexion aux données
RA2020 <- open_dataset("data/RA2020_exploitations.parquet")  |>
  filter(SIEGE_REG == "93") |> 
  group_by(SIEGE_DEP) |> 
  summarise(total_SAU = sum(SAU_TOT, na.rm = TRUE)) |> 
  collect()

=> Avec cette syntaxe, la requête va automatiquement utiliser les variables du fichier Parquet dont elle a besoin (en l’occurence SIEGE_REG, SIEGE_DEP et SAU_TOT) et minimiser l’occupation de la mémoire vive.


Revenons dans le détail sur cette syntaxe…

2.5 La fonction open_dataset() (1/4)


Comme la fonction read_parquet(), la fonction open_dataset() permet de lire des données stockées en format Parquet.

Le résultat obtenu avec la fonction open_dataset() n’est plus un data.frame mais un Arrow Table qui est une structure de données spécifique.


RA2020 <- open_dataset("data/RA2020_exploitations.parquet")

class(RA2020)

> [1] "FileSystemDataset" "Dataset" "ArrowObject" "R6" 

2.6 La fonction open_dataset() (2/4)

La fonction open_dataset() crée un objet qui apparaît dans Values.

L’affichage dans la console d’un Arrow Table affiche uniquement les métadonnées.

RA2020

> FileSystemDataset with 1 Parquet file
NOM_DOSSIER: string
TYPE_QUESTIONNAIRE: string
SEUIL_IFS: string
CHAMP_GEO: string
COEF_F: double
NUMSTRATE: string
STRATE: string
SIEGENAT: string
SIEGE_CODE_COM: string
SIEGE_LIEUDIT: string
SIEGE_LIEUDIT_CODE_DOM: string
SIEGE_LIB_COM: string
...

2.7 La fonction open_dataset() (3/4)

Pour afficher le contenu d’un Arrow Table, il faut d’abord le convertir en data.frame avec la fonction collect().

RA2020 <- RA2020 |> collect()

class(RA2020)

> [1] "data.frame"

# L'opération ci-dessus est à éviter pour des tables volumineuses, si besoin de visualiser la table, on préfèrera :

extrait_RA2020 <- RA2020 |> slice_head(n = 100) |> collect()


Toutefois rien ne presse car la grande différence entre manipuler un data.frame et un Arrow Table tient au moteur d’exécution :

  • Si on manipule un data.frame avec la syntaxe de dplyr, alors c’est le moteur d’exécution de dplyr qui fait les calculs

  • Si on manipule un Arrow Table avec la syntaxe de dplyr, alors c’est le moteur d’exécution d’arrow (nommé acero) qui fait les calculs. Et le moteur d’exécution d’arrow est beaucoup plus efficace et rapide

2.8 La fonction open_dataset() (4/4)


Il est recommandé de privilégier la fonction open_dataset() à la fonction read_parquet() pour au moins 2 raisons :

  • open_dataset() crée une connexion au fichier Parquet mais elle n’importe pas les données contenues dans ce fichier => une consommation de RAM moins importante !

  • open_dataset() peut se connecter à un fichier Parquet unique mais aussi à des fichiers Parquets partitionnés (voir plus loin)

2.9 L’évalutation/éxécution différée (1/4)


Cela signifie qu’arrow se contente de mémoriser les instructions, sans faire aucun calcul tant que l’utilisateur ne le demande pas explicitement.

Il existe 2 fonctions pour déclencher l’évaluation d’un traitement arrow mais qui présente des différences :

  • collect() qui renvoie le résultat du traitement sous la forme d’un data.frame/tibble
  • compute() qui renvoie le résultat du traitement sous la forme d’un Arrow Table.

La grande différence entre manipuler un tibble et manipuler un Arrow Table tient au moteur d’exécution :

  • Avec un data.frame/tibble => moteur d’exécution de dplyr
  • Avec un Arrow Table => moteur d’exécution d’arrow (nommé acero) plus efficace que celui de dplyr

Dans les traitements intermédiaires, on privilégiera la fonction compute() pour pouvoir utiliser le plus possible le moteur acero.

2.10 L’évalutation/éxécution différée (2/4)

SAU_DEP <- RA2020 |> 
  group_by(SIEGE_DEP) |> 
  summarise(total_SAU = sum(SAU_TOT, na.rm = TRUE))
class(SAU_DEP)
> [1] "arrow_dplyr_query"

resultats <- SAU_DEP |> 
  filter(SIEGE_DEP == "13") |> 
  collect()
> # A tibble: 1 × 2
  SIEGE_DEP total_SAU
  <chr>         <dbl>
1 13          145713.

Dans l’exemple ci-dessus, la première étape ne réalise aucun calcul par elle-même, car elle ne comprend ni collect() ni compute(). L’objet SAU_DEP n’est pas une table et ne contient pas de données, il contient simplement une requête (query) décrivant les opérations à mener sur la table du RA.

arrow analyse la requête avant de l’exécuter, et optimise le traitement pour minimiser le travail.
Dans notre exemple, arrow repère que la requête ne porte en fait que sur le département 13, et commence donc par filtrer les données sur le département avant de sommer la SAU les équipements, de façon à ne conserver que le minimum de données nécessaires et à ne réaliser que le minimum de calculs.

2.11 L’évalutation/éxécution différée (3/4)


L’évaluation/exécution différée est très puissante mais présente des limites.

On serait tentés d’écrire un traitement entier en mode lazy (sans aucun compute() ni collect() dans les étapes intermédiaires) et de faire un unique compute() ou collect() tout à la fin du traitement afin que toutes les opérations soient optimisées en une seule étape.


Malheureusement, le moteur acero a ses limites notamment sur des traitements trop complexes (ce qui génère des plantages de sessions R).

2.12 L’évalutation/éxécution différée (4/4)


QUELQUES CONSEILS POUR ÉLABORER LA BONNE STRATÉGIE AVEC L’ÉVALUATION DIFFÉRÉE :


  • Décomposer le traitement en plusieurs étapes puis exécuter chaque étape séparément (avec un compute())
  • Définir la bonne longueur des étapes intermédiaires en gardant en tête :
    • D’avoir des étapes de traitement qui ne dépassent pas 40 lignes de code
    • Que le séquencement des étapes soit cohérent avec l’objet du traitement
    • Plus les données sont volumineuses OU les opérations unitaires sont complexes, plus les étapes de traitement doivent être courtes/prudentes

2.13 Quelques manques sur le moteur acero (1/2)


La liste des fonctions du tidyverse supportées par acero est disponible sur cette page.

Il y a (encore) quelques grands absents, notamment :

  • pivot_wider() et pivot_longer() n’ont pas d’équivalent avec acero.

  • les empilements de plusieurs tables avec une seule fonction (bind_rows() dans dplyr).
    Avec des Arrow Tables, il faut appeler plusieurs fois ces fonctions (en l’occurence union(). Par exemple :

resultats <- table1 |>
  union(table2) |>
  union(table3) |>
  compute()

2.14 Quelques manques sur le moteur acero (2/2)


  • les window functions (ajouter à une table des informations issues d’une agrégation par groupe) comme par exemple :
res <- RA2020 |>
  group_by(SIEGE_REG) |>
  mutate(total_SAU = sum(SAU_TOT)) |>
  collect()
  
> Error: window functions not currently supported in Arrow
Call collect() first to pull data into R.

Remarque : le code ci-dessus fonctionne par contre en remplaçant le mutate() par un summarise().

2.15 Comment contourner le problème d’acero ? (1/3)


Plusieurs solutions existent :

  1. Comme suggéré par R, renoncer à manipuler les données sous forme d’Arrow Table avec le moteur acero en passant par un collect() et poursuivre le traitement avec le moteur d’exécution de dplyr (avec des performances moins importantes).


  1. Étudier le message d’erreur renvoyé par R et chercher à réécrire d’une autre façon le traitement.

2.16 Comment contourner le problème d’acero ? (2/3)


Exemple pour le point 2 issu d’utilitr :

resultats <- bpe_ens_2018_arrow |>
  group_by(DEP) |>
  summarise(
    nb_boulangeries  = sum(NB_EQUIP * (TYPEQU == "B203")),
    nb_poissonneries = sum(NB_EQUIP * (TYPEQU == "B206"))
  ) |>
  compute()
  
> ! NotImplemented: Function 'multiply_checked' has no kernel matching input types (double, bool); pulling data into R


L’erreur vient de l’opération sum(NB_EQUIP * (TYPEQU == “B203”)) : arrow ne parvient pas à faire la multiplication entre NB_EQUIP (un nombre réel) et (TYPEQU == “B203”) (un booléen).

2.17 Comment contourner le problème d’acero ? (3/3)


=> La solution est très simple: il suffit de convertir (TYPEQU == “B203”) en nombre entier avec la fonction as.integer() qui est supportée par acero.

Le code suivant peut alors être entièrement exécuté par acero:

resultats <- bpe_ens_2018_arrow |>
  group_by(DEP) |>
  summarise(
    nb_boulangeries  = sum(NB_EQUIP * as.integer(TYPEQU == "B203")),
    nb_poissonneries = sum(NB_EQUIP * as.integer(TYPEQU == "B206"))
  ) |>
  compute()

2.18 En conclusion sur le package arrow


Le package arrow présente 3 avantages majeurs :

  • Performances élevées : arrow est très efficace et très rapide pour la manipulation de données tabulaires (nettement plus performant que dplyr par exemple)

  • Usage réduit des ressources : arrow est conçu pour ne charger en mémoire que le minimum de données. Cela permet de réduire considérablement les besoins en mémoire, même lorsque les données sont volumineuses

  • Facilité d’apprentissage grâce aux approches dplyr et SQL: arrow peut être utilisé avec les verbes de dplyr (select, mutate, etc.) et/ou avec le langage SQL grâce à DuckDB (voir plus loin).

2.19 Exercice 1

Exercice 1 (premiers contacts avec un fichier parquet + rappels sur les fonctions)

  • Ouvrir le fichier parquet situé sous ~/CERISE/03-Espace-de-Diffusion/030_Structures_exploitations/3020_Recensements/RA_2020/01_BASES DIFFUSION RA2020/RA_2020_parquet/RA2020_EXPLOITATIONS_240112.parquet

  • Consulter les métadonnées de ce fichier

  • Consulter les 100 premières lignes de ce fichier

  • Récupérer dans un vecteur trié les codes régions des lieux principaux de production (SIEGE_REG)

  • Récupérer dans un vecteur trié les libellés régions des lieux principaux de production (SIEGE_LIB_REG)

  • Ecrire une fonction calculs_RA() qui - pour une région et une table donnée en entrée - conserve uniquement les lignes correspondantes selon la colonne SIEGE_REG, puis groupe la table par SIEGE_DEP et calcule la surface totale SAU (SAU_TOT), la surface totale de céréales (CEREALES_SUR_TOT) et la surface totale d’oléagineux (OLEAG_SUR_TOT) et enfin la part de la surface des cereales dans la SAU totale et la part de la surface des oléagineux dans la SAU totale.

  • Utilser ensuite la fonction calculs_RA() pour calculer ces indicateurs sur l’ensemble des régions présentes dans la table du RA2020 et stocker les résultats dans des fichiers Excel sous votre espace personnel.
    TIPS : pensez à utiliser {purrr} et {openxlsx} par exemple.

2.20 Exercice 2

Exercice 2 (collect() vs compute())

  • Dans votre espace de travail, créer les 2 fichiers parquet suivants :
data_a <- tibble(
  id = rep(1:1000000, each = 10),
  annee = rep(2016:2025, times = 1000000),
  a = sample(letters, 10000000, replace = TRUE)
)

data_b <- tibble(
  id = rep(1:1000000, each = 10),
  annee = rep(2016:2025, times = 1000000),
  b = runif(10000000, 1, 100)
)

data_c <- tibble(
  lettres = sample(letters, 10000000, replace = TRUE),
  classe = sample(c("pommes","poires","melon","fraise"), 10000000, replace = TRUE)
)

write_parquet(data_a, "data_a.parquet")
write_parquet(data_b, "data_b.parquet")
write_parquet(data_c, "data_c.parquet")

rm(data_a)
rm(data_b)
rm(data_c)
gc()
  • La suite de l’exercice sur la slide suivante…

2.21 Exercice 2 (suite)

Exercice 2 (collect() vs compute())

  1. AVEC collect()

Charger les fichiers parquet data_a et data_b sous forme de data.frame
Créer la table etape1 en réalisant une jointure à gauche de data_a avec data_b.
Charger le fichier parquet data_c sous forme de data.frame
Filtrer la table etape1 sur les années supérieures à 2020 puis faire la somme de la colonne b selon la colonne a
Ajouter le colonne classe issue de la table data_c dans le tableau final.


  1. AVEC compute()

Réaliser les mêmes traitements que A) avec des compute() et réduire le temps d’exécution.

3 Écrire des fichiers parquet

3.1 Données peu volumineuses: écrire un seul fichier Parquet (1/2)


En tant que responsable de sources, vous pouvez être amenés à écrire et déposer des fichiers Parquet, par exemple sous Cerise.

Pour cela, on utilise la fonction write_parquet().
Un 1er exemple simple à partir d’un fichier rds:

# Lecture du fichier rds
msa_ns <- readRDS("data/msa_ns_src_2023.rds")

# Écriture des données en format Parquet
write_parquet(x = msa_ns, sink = "data/msa_ns_src_2023.parquet")


3.2 Données peu volumineuses: écrire un seul fichier Parquet (2/2)


Un autre exemple un peu plus compliqué à partir de fichier csv contenu dans un zip sur internet :


# Chargement des packages
library(arrow)
library(readr)

# Téléchargement du fichier zip
download.file("https://www.insee.fr/fr/statistiques/fichier/2540004/dpt2021_csv.zip", destfile = "data/dpt2021_csv.zip") 
# Décompression du fichier zip
unzip("data/dpt2021_csv.zip", exdir = "data")

# Lecture du fichier CSV
dpt2021 <- read_delim(file = "data/dpt2021.csv")

# Écriture des données en format Parquet
write_parquet(x = dpt2021, sink = "data/dpt2021.parquet"))

3.3 Données volumineuses: écrire un fichier Parquet partitionné (1/3)


Pourquoi partitionner ?

Par définition, il n’est pas possible de charger seulement quelques lignes d’un fichier Parquet : on importe nécessairement des colonnes entières.

Lorsque le fichier Parquet est partitionné, arrow est capable de filtrer les lignes à importer à l’aide de clés departitionnement, ce qui permet d’accélérer l’importation des données.

Le partitionnement permet de travailler sur des fichiers Parquet de plus petite taille et donc de consommer moins de mémoire vive.

3.4 Données volumineuses: écrire un fichier Parquet partitionné (2/3)


Ça veut dire quoi partitionné ?

Partitionner un fichier revient à le “découper” selon une clé de partionnement (une ou plusieurs variables)

En pratique, l’ensemble des données sera stockée dans plusieurs fichiers au format Parquet.

Voici par exemple comment se présente un fichier Parquet partitionné selon les régions :

3.5 Données volumineuses: écrire un fichier Parquet partitionné (3/3)

Pour écrire des fichiers Parquet partitionnés, on utilise la fonction write_dataset().

Partitionnons notre fichier issu de la MSA par type d’exploitation et sexe :

write_dataset(
  dataset = msa_ns,
  path = "data/msa_ns",
  partitioning = c("TYPE_EXP","SEXE"), # les variables de partitionnement
  format = "parquet"
)

Voici un aperçu de l’arborescence créée (:

data/msa_ns
├── TYPE_EXP=1
│   ├── SEXE=1
│   │   └── part-0.parquet
│   └── SEXE=2
│       └── part-0.parquet
├── TYPE_EXP=2
│   ├── SEXE=1
│   │   └── part-0.parquet
│   └── SEXE=2
│       └── part-0.parquet
└── TYPE_EXP=3
    ├── SEXE=1
    │   └── part-0.parquet
    └── SEXE=2
        └── part-0.parquet
        
        

3.6 Industrialiser la conversion de vos fichiers ?

  • Le package R parquetize permet de faciliter la conversion de données au format Parquet.

  • Plusieurs formats supportés csv, json, rds, fst, SAS, SPSS, Stata, sqlite…

  • Propose des solutions de contournement pour les fichiers très volumineux.

Un exemple issu de la documentation :

Conversion from a local rds file to a partitioned parquet file :: 12
rds_to_parquet(
path_to_file = system.file("extdata","iris.rds",package = "parquetize"),
path_to_parquet = tempfile(fileext = ".parquet"),
partition = "yes",
partitioning = c("Species")
)

#> Reading data...
#> Writing data...
#> ✔ Data are available in parquet dataset under /tmp/RtmptNiaDm/file1897441ca0c0.parquet
#> Writing data...

#> Reading data...

3.7 Comment bien utiliser des fichiers partitionnés avec arrow (1/2)


La fonction open_dataset() permet d’ouvrir une connexion vers un fichier Parquet partitionné.

L’utilisation de la fonction open_dataset() est similaire au cas dans lequel on travaille avec un seul fichier Parquet.

Il y a toutefois 2 différences :

  • Le chemin indiqué n’est pas celui d’un fichier .parquet, mais le chemin d’un répertoire dans lequel se trouve le fichier Parquet partitionné Il est préférable d’indiquer le nom et le type de la ou des variable(s) de partitionnement.

3.8 Comment bien utiliser des fichiers partitionnés avec arrow (2/2)


Un exemple avec les données de la MSA :

# Établir la connexion au fichier Parquet partitionné
donnees_msa <- open_dataset(
  "data/msa_ns", # Ici, on met le chemin d'un répertoire
  hive_style = TRUE,
  partitioning = arrow::schema(TYPE_EXP = arrow::utf8(), SEXE = arrow::utf8()) # Les variables de   partitionnement
)

# Définir la requête
resultats_msa <- donnees_msa |>
  filter(TYPE_EXP == "2" & SEXE == "1") |> # Ici, on filtre selon les clés de partitionnement
  select(DEPT, RC_CHEF) |> 
  collect()

Ce qui donne :

resultats_msa

> resultats_msa
# A tibble: 62,195 × 2
   DEPT  RC_CHEF
 * <chr> <chr>  
 1 11    1461   
 2 11    2910   
 3 11    1528   
 4 11    4493   
 5 11    1490   
# ℹ 62,185 more rows
# ℹ Use `print(n = ...)` to see more rows

3.9 Conseils lors de l’utilisation de fichiers partitionnés

Afin de tirer au mieux profit du partitionnement, il est conseillé de filtrer les données de préférence selon les variables de partitionnement (dans notre exemple, TYP_EXP et SEXE).

Il est fortement recommandé de spécifier le type des variables de partitionnement avec l’argument partitioning.

Cela évite des erreurs typiques: le code du département est interprété à tort comme un nombre et aboutit à une erreur à cause de la Corse…

L’argument partitioning s’utilise en construisant un schéma qui précise le type de chacune des variables de partitionnement.

Voir cette page pour la liste des types supportés.

3.10 Dernier conseil avec arrow

Il est recommandé de définir les deux options suivantes au début de votre script.

Cela autorise arrow à utiliser plusieurs processeurs à la fois, ce qui accélère les traitements :

Autoriser arrow à utiliser plusieurs processeurs en même temps
options(arrow.use_threads = TRUE)

# Définir le nombre de processeurs utilisés par arrow
# 10 processeurs sont suffisants dans la plupart des cas
arrow:::set_cpu_count(parallel::detectCores() %/% 2)

4 Manipuler des fichiers parquets avec duckdb

4.1 En tant que responsable de sources

5 Pour en savoir plus

5.1 Conseil de lecture

Pour ceux qui veulent aller plus loin :

  • Comment bien utiliser le package arrow ? C’est par ici
  • Comment utiliser DuckDB sur des fichiers au format Parquet ? C’est par
  • Comment requêter des fichiers au format Parquet avec Python ? Lien vers la documentation.

Note interne écrite par le DEMESIS : voir ici